# Imports
import os
from keras.layers import *
from keras.models import Model, load_model
from keras.regularizers import l2
from keras.callbacks import ModelCheckpoint
from keras.optimizers import RMSprop, Adam, SGD
import matplotlib.pyplot as plt
import scipy.misc as m
def UNet(n):
inp = Input(shape=(None,None,3))
conv = [inp]
pool = [inp]
# Contracting path
for i in range(1, n):
conv.append(Conv2D(16*2**i, (3,3), activation='relu', data_format='channels_last', padding='same')(pool[i-1]))
conv[i] = Dropout(0.2)(conv[i])
conv[i] = Conv2D(16*2**i, (3,3), activation='relu', data_format='channels_last', padding='same')(conv[i])
pool.append(MaxPooling2D((2,2), data_format='channels_last')(conv[i]))
conv.append(Conv2D(16*2**n, (3,3), activation='relu', data_format='channels_last', padding='same')(pool[n-1]))
conv[n] = Conv2D(16*2**n, (3,3), activation='relu', data_format='channels_last', padding='same')(conv[n])
convUp = conv[n]
# Expanding path
for i in range(n-1, 0, -1):
up = UpSampling2D(size=(2,2))(convUp)
up = concatenate([up, conv[i]], axis=3)
convUp = Conv2D(16*2**i, (3,3), activation='relu', data_format='channels_last', padding='same')(up)
convUp = Dropout(0.2)(convUp)
convUp = Conv2D(16*2**i, (3,3), activation='relu', data_format='channels_last', padding='same')(convUp)
last_conv = Conv2D(2, (1,1), activation='softmax', data_format='channels_last', padding='same')(convUp)
model = Model(inputs=inp, outputs=last_conv)
return model
unet = UNet(3)
unet.summary()
dir_drive = "/home/augusto/tmp/datasets/drive/training/"
dir_img = dir_drive + "images/"
dir_seg = dir_drive + "1st_manual/"
dir_test = "/home/augusto/tmp/datasets/drive/test/"
dir_test_img = dir_test + "images/"
dir_test_seg = dir_test + "1st_manual/"
with open(dir_drive + 'train.txt') as f:
x = f.read().splitlines()
imgs = np.array(x)
with open(dir_drive + 'val.txt') as f:
x = f.read().splitlines()
val_imgs = np.array(x)
with open(dir_test + 'test.txt') as f:
x = f.read().splitlines()
test_imgs = np.array(x)
print(imgs[0])
def extract_patches(img, size):
H, W, *c = img.shape
h = H//size[0]
w = W//size[1]
patches = []
for i in range(h):
for j in range(w):
patches.append(img[i*size[0]:(i+1)*size[0], j*size[1]:(j+1)*size[1]])
return np.array(patches)
def to_categorical(patches):
n, H, W = patches.shape
segmentation = np.zeros((n,H,W,2))
for i in range(n):
segmentation[i,:,:,0] = (patches[i,:,:] == 0).astype(int)
segmentation[i,:,:,1] = (patches[i,:,:] > 0).astype(int)
return segmentation
images = []
gt = []
for line in imgs:
names = line.split(' ')
images.append(m.imread(dir_img + names[0]))
gt.append(m.imread(dir_seg + names[1]))
images = np.array(images)
patch_size = 32
X_train = []
y_train = []
for i, img in enumerate(images):
seg_img = gt[i]
X_train.append(extract_patches(img, (patch_size, patch_size)))
y_patches = to_categorical(extract_patches(seg_img, (patch_size,patch_size)))
y_train.append(y_patches)
fig = plt.figure(figsize=(20,40))
ax = fig.add_subplot(1,2,1)
ax.imshow(img)
ax.set_title("original image")
ax = fig.add_subplot(1,2,2)
ax.imshow(seg_img, cmap='gray')
ax.set_title('ground truth')
plt.show()
X_train = np.array(X_train)
X_train = np.reshape(X_train, (-1, patch_size,patch_size,3))
y_train = np.array(y_train)
y_train = np.reshape(y_train, (-1, patch_size, patch_size,2))
print(X_train.shape, y_train.shape)
X_val = []
y_val = []
for line in val_imgs:
names = line.split(' ')
val_img = m.imread(dir_img + names[0])
val_seg = m.imread(dir_seg + names[1])
X_val.append(extract_patches(val_img, (patch_size, patch_size)))
y_patches = to_categorical(extract_patches(val_seg, (patch_size,patch_size)))
y_val.append(y_patches)
X_val, y_val = np.array(X_val), np.array(y_val)
X_val = np.reshape(X_val, (-1, patch_size, patch_size, 3))
y_val = np.reshape(y_val, (-1, patch_size, patch_size, 2))
print(X_val.shape, y_val.shape)
n = 100
fig = plt.figure(figsize=(5,10))
ax = fig.add_subplot(1,2,1)
ax.imshow(X_train[n][:,:])
ax.set_title('img')
ax = fig.add_subplot(1,2,2)
ax.imshow(y_train[n][:,:,1], cmap='gray')
ax.set_title('gt')
plt.show()
opt = Adam(lr=1e-4)
unet.compile(loss='binary_crossentropy',
optimizer=opt,
metrics=['accuracy'])
checkpoint = ModelCheckpoint('weights/unet_drive.hdf5', monitor='val_loss', verbose=1, save_best_only=True, mode='min')
# unet.fit(X_train, y_train,
# validation_data=(X_val, y_val),
# epochs=60, batch_size=8,
# verbose=2,
# callbacks=[checkpoint])
# unet.save('models/unet_drive.h5')
unet = load_model('models/unet_drive.h5')
X_test = []
y_test = []
for line in test_imgs:
names = line.split()
test_img = m.imread(dir_test_img + names[0])
test_seg = m.imread(dir_test_seg + names[1])
X_test.append(test_img)
y_ing = to_categorical(np.array([test_seg]))
y_test.append(y_ing)
y_test = np.array(y_test)
y_test = np.squeeze(y_test)
X_test = np.array(X_test)
print(X_test.shape, y_test.shape)
y_pred = unet.predict(X_test[:10,:,:-1,:])
y_pred2 = unet.predict(X_test[10:,:,:-1,:])
y_pred = np.concatenate((y_pred, y_pred2),axis=0)
y_predi = np.argmax(y_pred, axis=3)
y_testi = np.argmax(y_test[:,:,:-1,:], axis=3)
print(y_testi.shape,y_predi.shape)
gtp = (y_testi == 1).astype(int)
pp = (y_predi == 1).astype(int)
gtn = (y_testi == 0).astype(int)
pn = (y_predi == 0).astype(int)
TP = (gtp*pp).sum()
TN = (gtn*pn).sum()
FP = (gtn*pp).sum()
FN = (gtp*pn).sum()
Precision = TP/(TP+FP)
Sensitivity = TP/(TP+FN)
FMeasure = (2 * Precision*Sensitivity)/(Precision + Sensitivity)
print(Precision, Sensitivity, FMeasure)
shape = (584, 565)
n_classes= 1
for i in range(10):
img_is = X_test[i]
seg = y_predi[i]
segtest = y_testi[i]
fig = plt.figure(figsize=(20,40))
ax = fig.add_subplot(1,3,1)
ax.imshow(img_is, cmap='gray')
ax.set_title("original")
ax = fig.add_subplot(1,3,2)
ax.imshow(seg, cmap='gray')
ax.set_title("predicted class")
ax = fig.add_subplot(1,3,3)
ax.imshow(segtest, cmap='gray')
ax.set_title("true class")
plt.show()
# m.imsave('images/drive/'+str(i)+'_orig.png', img_is)
m.imsave('images/drive/'+str(i)+'_pred_unet.png', seg)
# m.imsave('images/drive/'+str(i)+'_segm.png', segtest)
def calc_rates(pred, gt):
gtp = (gt >= 1).astype(int)
pp = (pred >= 1).astype(int)
gtn = (gt == 0).astype(int)
pn = (pred == 0).astype(int)
TP = (gtp*pp).sum()
TN = (gtn*pn).sum()
FP = (gtn*pp).sum()
FN = (gtp*pn).sum()
return TP, TN, FP, FN
def precision(pred, gt):
TP, TN, FP, FN = calc_rates(pred, gt)
return TP/(TP+FP)
def recall(pred, gt):
TP, TN, FP, FN = calc_rates(pred, gt)
return TP/(TP+FN)
def FM(prediction, ground_truth):
P = precision(prediction, ground_truth)
R = recall(prediction, ground_truth)
F = (2*P*R)/(P+R)
return F
test_gt = y_testi
P = precision(y_predi, test_gt)
R = recall(y_predi, test_gt)
F = FM(y_predi, test_gt)
print(P, R, F)
dir_dibco = "/home/augusto/tmp/scratch/DIBCO/"
with open(dir_dibco + 'train.txt') as f:
x = f.read().splitlines()
imgs = np.array(x)
with open(dir_dibco + 'val.txt') as f:
x = f.read().splitlines()
val_imgs = np.array(x)
with open(dir_dibco + 'test.txt') as f:
x = f.read().splitlines()
test_imgs = np.array(x)
print(imgs[0])
images = []
gt = []
for line in imgs:
names = line.split()
images.append(m.imread(dir_dibco + names[0], mode='RGB'))
gt.append(m.imread(dir_dibco + names[1], mode='L'))
images = np.array(images)
gt = np.array(gt)
patch_size = 32
X_train = []
y_train = []
for i, img in enumerate(images):
seg_img = gt[i]
X_train.append(extract_patches(img, (patch_size, patch_size)))
y_patches = to_categorical(extract_patches(seg_img, (patch_size, patch_size)))
y_train.append(y_patches)
fig = plt.figure(figsize=(20,40))
ax = fig.add_subplot(1,2,1)
ax.imshow(img)
ax.set_title("original image")
ax = fig.add_subplot(1,2,2)
ax.imshow(seg_img, cmap='gray')
ax.set_title('ground truth')
plt.show()
print(len(X_train))